-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(backend): randomizing output uri path to avoid overwriting. Fixes #10186 #11243
fix(backend): randomizing output uri path to avoid overwriting. Fixes #10186 #11243
Conversation
Signed-off-by: b4sus <[email protected]>
Hi @b4sus. Thanks for your PR. I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/ok-to-test
Hey @b4sus , thanks for the contribution! Can you provide a sample pipeline that illustrates the issue this pr is aiming to resolve? At least in the case of a component being re-used, I believe the taskname will have a cc @gmfrasca |
@HumairAK - This appears to only impact output artifacts, and only changes the driver behavior when in CONTAINER driver mode, so I don't believe this should have any effect on #10798 in terms of sub-DAG naming schemes, etc. With that said, I did see that ParallelFor outputs are storing artifacts in the same URI, which is a problem that this PR addresses by adding UUID salts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested this out using a ParallelFor task and confirmed each iteration's output artifacts are given unique URIs which are referenced properly in KFP UI.
/lgtm
Hey @HumairAK , We noticed the problem when, from one pipeline, we started other pipelines (pipeline as component) using ParallelFor. This is roughly the code: @dsl.pipeline
def inner_pipeline(date_to_process: str):
comp1_task = component1(date_to_process = date_to_process)
comp2_task = component2(comp1_task.outputs["output_df"])
@dsl.pipeline
def main_pipeline(from_date: str, to_date: str):
prepare_dates_task = prepare_dates_component(from_date = from_date, to_date = to_date)
with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
inner_ppln_task = inner_pipeline(date_to_process = date_to_process) In this case, many inner pipelines were started (more then 4 as parallelism is not yet supported) and problem was that output of component1 was/is written to the same minio location, so overwriting each other. And subsequently couple of component2 tasks get the same input, regardless of the argument (date_to_process), producing the same final output (not visible here in code as it is store directly in component). |
Perfect, thanks guys tested and works as well with the following pipeline: pipeline.pyfrom typing import List
from kfp import dsl, compiler
from kfp.dsl import Dataset
from kfp.dsl import Output, InputPath
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component1(date_to_process: str, output_df: Output[Dataset]):
with open(output_df.path, 'w') as f:
f.write(date_to_process)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def component2(dataset_in: InputPath('Dataset')):
with open(dataset_in, 'r') as input_file:
dataset_one_contents = input_file.read()
print(dataset_one_contents)
@dsl.component(base_image="quay.io/opendatahub/ds-pipelines-ci-executor-image:v1.0")
def prepare_dates_component() -> List[str]:
return ["1", "2", "3", "4", "5", "6"]
@dsl.pipeline
def inner_pipeline(date_to_process: str):
comp1_task = component1(date_to_process = date_to_process).set_caching_options(enable_caching=False)
comp2_task = component2(dataset_in = comp1_task.outputs["output_df"]).set_caching_options(enable_caching=False)
@dsl.pipeline
def main_pipeline():
prepare_dates_task = prepare_dates_component().set_caching_options(enable_caching=False)
with dsl.ParallelFor(items = prepare_dates_task.output, parallelism=4) as date_to_process:
inner_ppln_task = inner_pipeline(date_to_process = date_to_process)
if __name__ == '__main__':
compiler.Compiler().compile(main_pipeline, __file__ + '.yaml') /lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: HumairAK The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
In driver, random string is added when uri paths for output artifacts are generated. This should ensure that when component of certain name is executed in parallel (either with ParallelFor or just simply calling it multiple times in @pipeline), its outputs are always stored to different paths.